-
Notifications
You must be signed in to change notification settings - Fork 5
feat: implement snapshot bootstrapper with cbor download + omnibus ch… #396
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
# Conflicts: # modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements a snapshot bootstrapper module that enables Acropolis to initialize from NewEpochState CBOR snapshot files. The implementation adds an alternative bootstrap path alongside the existing genesis bootstrapper, allowing users to choose between starting from genesis or from a pre-built snapshot. The snapshot bootstrapper downloads gzip-compressed CBOR files from configured URLs, decompresses them on-the-fly, and parses them to reconstruct blockchain state.
Key changes:
- New
SnapshotBootstrappermodule with download orchestration and gzip decompression - Dynamic bootstrap module selection based on
startup.methodconfig ("genesis" or "snapshot") - Configuration structure for network-specific snapshots with metadata in JSON files
Reviewed changes
Copilot reviewed 8 out of 9 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| processes/omnibus/src/main.rs | Adds dynamic bootstrap module registration based on startup.method config |
| processes/omnibus/omnibus.toml | Reorganizes config structure and adds snapshot bootstrapper configuration section |
| processes/omnibus/Cargo.toml | Adds dependency on the new snapshot_bootstrapper module |
| modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs | Implements core snapshot download, decompression, and parsing logic with error handling |
| modules/snapshot_bootstrapper/data/mainnet/snapshots.json | Defines metadata for available mainnet snapshots (epochs 507-509) |
| modules/snapshot_bootstrapper/data/mainnet/config.json | Specifies which snapshots to load and associated block points |
| modules/snapshot_bootstrapper/Cargo.toml | Declares dependencies for HTTP client, compression, and JSON parsing |
| common/src/snapshot/streaming_snapshot.rs | Adjusts logging format for consistency (removes emojis and extra indentation) |
| Cargo.lock | Updates lock file with new dependencies |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Self::download_snapshot(&metadata.url, file_path).await?; | ||
| info!("Downloaded: {}", file_path); | ||
| Ok(()) | ||
| } | ||
|
|
||
| async fn download_snapshot(url: &str, output_path: &str) -> Result<(), SnapshotBootstrapError> { | ||
| if let Some(parent) = Path::new(output_path).parent() { | ||
| tokio::fs::create_dir_all(parent) | ||
| .await | ||
| .map_err(|e| SnapshotBootstrapError::CreateDirectory(parent.to_path_buf(), e))?; | ||
| } | ||
|
|
||
| let client = reqwest::Client::new(); |
Copilot
AI
Nov 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A new reqwest::Client is created for each download. Consider reusing a client instance or accepting it as a parameter, as creating a new client for each request is inefficient and may consume unnecessary resources. The client can be created once and reused for all downloads.
| Self::download_snapshot(&metadata.url, file_path).await?; | |
| info!("Downloaded: {}", file_path); | |
| Ok(()) | |
| } | |
| async fn download_snapshot(url: &str, output_path: &str) -> Result<(), SnapshotBootstrapError> { | |
| if let Some(parent) = Path::new(output_path).parent() { | |
| tokio::fs::create_dir_all(parent) | |
| .await | |
| .map_err(|e| SnapshotBootstrapError::CreateDirectory(parent.to_path_buf(), e))?; | |
| } | |
| let client = reqwest::Client::new(); | |
| let client = reqwest::Client::new(); | |
| Self::download_snapshot(&client, &metadata.url, file_path).await?; | |
| info!("Downloaded: {}", file_path); | |
| Ok(()) | |
| } | |
| async fn download_snapshot( | |
| client: &reqwest::Client, | |
| url: &str, | |
| output_path: &str, | |
| ) -> Result<(), SnapshotBootstrapError> { | |
| if let Some(parent) = Path::new(output_path).parent() { | |
| tokio::fs::create_dir_all(parent) | |
| .await | |
| .map_err(|e| SnapshotBootstrapError::CreateDirectory(parent.to_path_buf(), e))?; | |
| } |
| points: Vec<Point>, | ||
| } | ||
|
|
||
| /// Point | ||
| #[derive(Debug, Deserialize, Serialize)] | ||
| #[serde(rename_all = "camelCase")] | ||
| struct Point { | ||
| epoch: u64, | ||
| id: String, | ||
| slot: u64, | ||
| } | ||
|
|
Copilot
AI
Nov 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Point struct is only used as part of NetworkConfig.points which is never accessed. If the points field is not needed, this struct definition can be removed as well.
| points: Vec<Point>, | |
| } | |
| /// Point | |
| #[derive(Debug, Deserialize, Serialize)] | |
| #[serde(rename_all = "camelCase")] | |
| struct Point { | |
| epoch: u64, | |
| id: String, | |
| slot: u64, | |
| } | |
| } |
| for snapshot_meta in &target_snapshots { | ||
| let filename = format!("{}.cbor", snapshot_meta.point); | ||
| let file_path = format!("{}/{}", network_dir, filename); | ||
|
|
||
| if let Err(e) = | ||
| Self::ensure_snapshot_downloaded(&file_path, snapshot_meta).await | ||
| { | ||
| error!("Failed to download snapshot: {}", e); | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| for snapshot_meta in target_snapshots { | ||
| let filename = format!("{}.cbor", snapshot_meta.point); | ||
| let file_path = format!("{}/{}", network_dir, filename); |
Copilot
AI
Nov 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The filename construction logic is duplicated on lines 361-362 and 373-374. Consider extracting this into a helper function or reusing the computed paths to avoid code duplication.
| async fn download_snapshot(url: &str, output_path: &str) -> Result<(), SnapshotBootstrapError> { | ||
| if let Some(parent) = Path::new(output_path).parent() { | ||
| tokio::fs::create_dir_all(parent) | ||
| .await | ||
| .map_err(|e| SnapshotBootstrapError::CreateDirectory(parent.to_path_buf(), e))?; | ||
| } | ||
|
|
||
| let client = reqwest::Client::new(); | ||
| let response = client | ||
| .get(url) | ||
| .send() | ||
| .await | ||
| .map_err(|e| SnapshotBootstrapError::DownloadError(url.to_string(), e))?; | ||
|
|
||
| if !response.status().is_success() { | ||
| return Err(SnapshotBootstrapError::DownloadInvalidStatusCode( | ||
| url.to_string(), | ||
| response.status(), | ||
| )); | ||
| } | ||
|
|
||
| let total_size = response.content_length().unwrap_or(0); | ||
| if total_size > 0 { | ||
| info!("Downloading {} MB (compressed)...", total_size / 1_000_000); | ||
| } | ||
|
|
||
| let tmp_path = Path::new(output_path).with_extension("partial"); | ||
| let mut file = File::create(&tmp_path).await?; | ||
|
|
||
| let raw_stream_reader = | ||
| StreamReader::new(response.bytes_stream().map_err(io::Error::other)); | ||
| let buffered_reader = BufReader::new(raw_stream_reader); | ||
| let mut decoded_stream = GzipDecoder::new(buffered_reader); | ||
|
|
||
| tokio::io::copy(&mut decoded_stream, &mut file).await?; | ||
| file.sync_all().await?; | ||
| tokio::fs::rename(&tmp_path, output_path).await?; | ||
|
|
||
| Ok(()) | ||
| } |
Copilot
AI
Nov 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The download function does not validate the integrity of downloaded snapshots (e.g., via checksums or signatures). Consider adding integrity verification to ensure the downloaded data hasn't been tampered with, especially since these snapshots will be used to bootstrap the system state. The SnapshotFileMetadata could be extended to include a checksum field.
|
|
||
| // Get startup method from config | ||
| let startup_method = | ||
| config.get_string("startup.method").unwrap_or_else(|_| "snapshot".to_string()); |
Copilot
AI
Nov 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The default value for startup.method is "snapshot", but the config file omnibus.toml sets it to "genesis". This inconsistency could cause confusion. Consider aligning the default value with the most common use case, or document why "snapshot" is the fallback default when the existing config file explicitly sets "genesis".
| config.get_string("startup.method").unwrap_or_else(|_| "snapshot".to_string()); | |
| config.get_string("startup.method").unwrap_or_else(|_| "genesis".to_string()); |
| let mut file = File::create(&tmp_path).await?; | ||
|
|
||
| let raw_stream_reader = | ||
| StreamReader::new(response.bytes_stream().map_err(io::Error::other)); | ||
| let buffered_reader = BufReader::new(raw_stream_reader); | ||
| let mut decoded_stream = GzipDecoder::new(buffered_reader); | ||
|
|
||
| tokio::io::copy(&mut decoded_stream, &mut file).await?; | ||
| file.sync_all().await?; | ||
| tokio::fs::rename(&tmp_path, output_path).await?; | ||
|
|
||
| Ok(()) |
Copilot
AI
Nov 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the download or decompression fails after creating the .partial file (lines 472-481), the partial file is not cleaned up. This could leave corrupted partial files on disk. Consider adding error handling to ensure the temporary file is removed on failure. For example:
let result = async {
let mut file = File::create(&tmp_path).await?;
// ... download and decompress ...
tokio::fs::rename(&tmp_path, output_path).await?;
Ok(())
}.await;
if result.is_err() {
let _ = tokio::fs::remove_file(&tmp_path).await;
}
result| let mut file = File::create(&tmp_path).await?; | |
| let raw_stream_reader = | |
| StreamReader::new(response.bytes_stream().map_err(io::Error::other)); | |
| let buffered_reader = BufReader::new(raw_stream_reader); | |
| let mut decoded_stream = GzipDecoder::new(buffered_reader); | |
| tokio::io::copy(&mut decoded_stream, &mut file).await?; | |
| file.sync_all().await?; | |
| tokio::fs::rename(&tmp_path, output_path).await?; | |
| Ok(()) | |
| let result = async { | |
| let mut file = File::create(&tmp_path).await?; | |
| let raw_stream_reader = | |
| StreamReader::new(response.bytes_stream().map_err(io::Error::other)); | |
| let buffered_reader = BufReader::new(raw_stream_reader); | |
| let mut decoded_stream = GzipDecoder::new(buffered_reader); | |
| tokio::io::copy(&mut decoded_stream, &mut file).await?; | |
| file.sync_all().await?; | |
| tokio::fs::rename(&tmp_path, output_path).await?; | |
| Ok(()) | |
| }.await; | |
| if result.is_err() { | |
| let _ = tokio::fs::remove_file(&tmp_path).await; | |
| } | |
| result |
|
|
||
| let completion_topic = | ||
| config.get_string("completion-topic").unwrap_or(DEFAULT_COMPLETION_TOPIC.to_string()); | ||
| info!("Publishing snapshots on '{snapshot_topic}'"); |
Copilot
AI
Nov 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log message on line 296 duplicates the information from line 301. Consider removing line 296 to avoid redundant logging.
| info!("Publishing snapshots on '{snapshot_topic}'"); |
| info!("Downloaded: {}", file_path); | ||
| Ok(()) | ||
| } | ||
|
|
Copilot
AI
Nov 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function lacks documentation. Consider adding a doc comment explaining that it downloads a gzip-compressed snapshot from the given URL, decompresses it on-the-fly, and saves the decompressed CBOR data to the output path using a .partial temporary file.
| /// Downloads a gzip-compressed snapshot from the given URL, decompresses it on-the-fly, | |
| /// and saves the decompressed CBOR data to the specified output path. | |
| /// The data is first written to a `.partial` temporary file to ensure atomicity, | |
| /// and then renamed to the final output path upon successful completion. | |
| /// | |
| /// # Arguments | |
| /// * `url` - The URL of the gzip-compressed snapshot to download. | |
| /// * `output_path` - The file path where the decompressed CBOR data will be saved. | |
| /// | |
| /// # Errors | |
| /// Returns a `SnapshotBootstrapError` if any IO, network, or decompression error occurs. |
| } | ||
| } | ||
|
|
||
| impl SnapshotBootstrapper { |
Copilot
AI
Nov 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Consider adding documentation to the init function explaining the configuration parameters it accepts (network, data-dir, startup-topic, snapshot-topic, completion-topic) and the expected directory structure (data-dir/network/config.json and snapshots.json).
| impl SnapshotBootstrapper { | |
| impl SnapshotBootstrapper { | |
| /// Initializes the snapshot bootstrapper. | |
| /// | |
| /// # Configuration Parameters | |
| /// - `network` (string): The network to use (e.g., "mainnet", "testnet"). Defaults to `"mainnet"` if not specified. | |
| /// - `data-dir` (string): The base directory for data files. Defaults to `"./data"` if not specified. | |
| /// - `startup-topic` (string): The topic to subscribe to for startup messages. Defaults to `DEFAULT_STARTUP_TOPIC` if not specified. | |
| /// - `snapshot-topic` (string): The topic to publish snapshot data to. Defaults to `DEFAULT_SNAPSHOT_TOPIC` if not specified. | |
| /// - `completion-topic` (string): The topic to publish completion notifications to. Defaults to `DEFAULT_COMPLETION_TOPIC` if not specified. | |
| /// | |
| /// # Expected Directory Structure | |
| /// The following files are expected to exist: | |
| /// - `{data-dir}/{network}/config.json`: The configuration file for the specified network. | |
| /// - `{data-dir}/{network}/snapshots.json`: The snapshot metadata file. | |
| /// | |
| /// # Arguments | |
| /// - `context`: The application context for message subscription and publishing. | |
| /// - `config`: The configuration object containing the parameters above. | |
| /// | |
| /// # Returns | |
| /// Returns `Ok(())` if initialization succeeds, or an error otherwise. |
| // read headers | ||
| // read and process ALL of the snapshot files, not just one. | ||
|
|
||
| let span = info_span!("snapshot_bootstrapper.handle"); |
Copilot
AI
Nov 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code below (lines 319-387) reads config files and processes all snapshot files, which addresses some of the tasks mentioned in the TODO comment above (lines 311-315). Consider updating or removing that TODO comment to reflect what has been implemented and what remains (nonces and headers).
Description
Adds support for bootstrapping Acropolis from NewEpochState CBOR snapshot files. Downloads and parses snapshots from configured URLs.
Related Issue(s)
#395
How was this tested?
Checklist
Impact / Side effects
No breaking changes. This is an additive change.
New optional config:
Dependencies added:
async-compression,futures-util,reqwest,tokio-utilReviewer notes / Areas to focus
snapshot_bootstrapper.rs: Download orchestration and gzip decompressionmain.rs: Two-line addition (import + register)